package com.niit.rdd

import org.apache.spark.{SparkConf, SparkContext}

object SparkRDD_Action_01 {

  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("spark")
    val sc = new SparkContext(sparkConf)
    sc.setLogLevel("ERROR")

    val rdd1 =  sc.makeRDD( List(1,2,3,4),2 )

    /*
    reduce:聚集RDD中的所有元素，先聚合分区内数据，再聚合分区间数据
     */
    val i:Int =  rdd1.reduce(_ + _)
    println(i)

    /*
      collect:方法会将不同分区的数据，按照分区顺序采集到Driver内存中，形成数组
     */
    val arrs:Array[Int] = rdd1.collect()
    println(arrs.mkString(","))

    /*
    count:返回RDD的长度
     */
    val cnt =  rdd1.count();
    println(cnt)

    /*
    first:返回RDD中的第一个元素
     */
    val f=  rdd1.first()
    println(f)

    /*
    take：返回一个由RDD的前n个元素组成的数组
     */
    val arrs2:Array[Int] =  rdd1.take(3);
    println(arrs2.mkString(","))
    /*
      takeOrder:数据排序后，取前n个数据
     */
    val rdd2 =  sc.makeRDD( List(3,1,2,4),2 )
    val arrs3:Array[Int] = rdd2.takeOrdered(3)
    println(arrs3.mkString(","))
    /*
    aggregate:分区的数据通过初始值和分区内的数据进行聚合，然后再和初始值进行分区间的数据聚合
     */
    // 分区内：[1,2,10] 13    [3,4,10]  17
    // 分区间：13  +  17 + 10
    //初始值会参与分区内计算，并且也参与分区间计算
    val res1 =  rdd1.aggregate(10)(_+_,_+_)
    println(res1)
    //如果分区内和分区间计算规则相同时，可以使用fold来代替
    val res2 = rdd1.fold(10)(_+_)

    /*
      countByKey:统计每种key的个数
     */
    val rdd3 =  sc.makeRDD( List( ("a",1),("a",2),("b",3),("c",4) ) )
    val counRdd:collection.Map[String,Long] =  rdd3.countByKey()
    println(counRdd)

    /*
    foreach
     */
    println("-----------------------")
    //是Driver端内存集合的循环遍历方法
    rdd1.collect().foreach(println)
    println("-----------------------")
    //是EXecutor端 数据打印
    rdd1.foreach(println)

    /*
      save:
        saveAsTextFile:任意类型的数据
        saveAsObjectFile:任意类型的数据
        saveAsSequenceFile:法要求数据格式必须k-v类型
     */
    val rdd4 =  sc.makeRDD( List( ("a",1),("a",2),("b",3),("c",4) ) )
    //rdd4.saveAsTextFile("output4")
    //将数据转成对象字节码进行存储
    //rdd4.saveAsObjectFile("output5")
    //saveAsSequenceFile方法要求数据格式必须k-v类型
    rdd4.saveAsSequenceFile("output6")

    sc.stop()


  }

}
